package com.inyourcode.core.cluster;

import com.alibaba.fastjson.JSONObject;
import com.inyourcode.core.cluster.api.ClusterConst;
import com.inyourcode.core.cluster.api.IClusterNodeType;
import com.inyourcode.core.db.redis.RedisConfig;
import com.inyourcode.core.transport.api.UnresolvedAddress;
import com.inyourcode.core.transport.netty.JClusterTcpConnector;
import com.inyourcode.core.transport.netty.JNettyConnection;
import com.inyourcode.core.transport.netty.channel.NettyChannel;
import com.inyourcode.core.transport.rpc.ClusterClient;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 集群状态管理
 */
@Component
public class ClusterNodeManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterNodeManager.class);
    private static ClusterNodeManager instance;
    private ConcurrentHashMap<String, ClusterNodeConf> clusterNodeMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ClusterConnector> clusterConnectorMap = new ConcurrentHashMap<>();
    @Qualifier(RedisConfig.STRING_REDIS_TEMPLATE)
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    ClusterNodeConf currentClusterNodeConf;
    Map<IClusterNodeType, ClusterClient> clusterClientMap = new HashMap<>();

    public static ClusterNodeManager getInstance() {
        return instance;
    }

    @PostConstruct
    void init() {
        instance = this;
        ClusterType[] clusterTypes = ClusterType.values();
        for (ClusterType clusterType : clusterTypes) {
            ClusterClient rpcClient = new ClusterClient(clusterType.getType());
            rpcClient.withConnector(new JClusterTcpConnector());
            clusterClientMap.put(clusterType, rpcClient);
        }
    }

    public ClusterNodeConf getClusterNode(String uuid) {
        ClusterNodeConf clusterNodeConf = clusterNodeMap.get(uuid);
        if (clusterNodeConf == null) {
            return null;
        }

        if (!clusterNodeConf.isActive()) {
            return null;
        }
        return clusterNodeConf;
    }

    public void incNodeLoad() {
        int old = this.currentClusterNodeConf.getCurrentLoad();
        this.currentClusterNodeConf.setCurrentLoad(old + 1);
    }

    public void deIncNodeLoad() {
        int old = this.currentClusterNodeConf.getCurrentLoad();
        this.currentClusterNodeConf.setCurrentLoad(old - 1);
    }

    public ClusterNodeConf getCurrentClusterNodeConf() {
        return currentClusterNodeConf;
    }

    /**
     * 选择负载最低的节点
     * @param type
     * @return
     */
    public ClusterNodeConf selectLowNode(IClusterNodeType type) {
        if (type == null) {
            return null;
        }

        ClusterNodeConf lowNode = null;
        for (ClusterNodeConf node : clusterNodeMap.values()) {
            if (!node.getNodeType().equals(type.getType())) {
                continue;
            }

            if (lowNode == null) {
               lowNode = node;
            } else {
                if (lowNode.getCurrentLoad() > node.getCurrentLoad()) {
                    lowNode = node;
                }
            }
        }

        if (lowNode == null) {
            return null;
        }

        return lowNode;
    }

    public void tick() {
        long currentTimeMillis = System.currentTimeMillis();
        currentClusterNodeConf.setLastActiveTimeMillis(currentTimeMillis);
        redisTemplate.opsForHash().put(ClusterConst.KEY_CLUSTER_DATA, currentClusterNodeConf.uniqueKey(), JSONObject.toJSONString(currentClusterNodeConf));

        Map<String, String> clusterDataFromRedis = redisTemplate.opsForHash().entries(ClusterConst.KEY_CLUSTER_DATA);
        if (CollectionUtils.isEmpty(clusterDataFromRedis)) {
            return;
        }

        for (Map.Entry<String, String> entry : clusterDataFromRedis.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            ClusterNodeConf nodeFromDB = JSONObject.parseObject(value, ClusterNodeConf.class);

            String uniqueId = nodeFromDB.uniqueKey();
            ClusterNodeConf existsNode = clusterNodeMap.get(uniqueId);
            //新加入的节点
            if (existsNode == null) {
                if (!nodeFromDB.isActive(currentTimeMillis)) {
                    clusterNodeMap.remove(uniqueId);
                    redisTemplate.opsForHash().delete(ClusterConst.KEY_CLUSTER_DATA, uniqueId);
                    LOGGER.error("Old node[{}] is not active.", nodeFromDB);
                    continue;
                }
                clusterNodeMap.put(uniqueId, nodeFromDB);
                LOGGER.info("New node[{}] is added to the cluster", nodeFromDB);
            } else {
                //检查是否存活
                if (!nodeFromDB.isActive(currentTimeMillis)) {
                    clusterNodeMap.remove(uniqueId);
                    redisTemplate.opsForHash().delete(ClusterConst.KEY_CLUSTER_DATA, uniqueId);
                    LOGGER.error("Old node[{}] has lost connection.", nodeFromDB);
                } else {
                    existsNode.updateInfo(nodeFromDB);
                }
            }

        }

        connectToOtherClusterNode();
    }

    /**
     * @param channel
     */
    public void removeClusterConnector(Channel channel){
        if (channel == null) {
            LOGGER.error("The remove failed, channel is null");
            return;
        }

        Collection<ClusterConnector> clusterConnectors = clusterConnectorMap.values();
        ClusterConnector loseConnector = null;
        for (ClusterConnector connector : clusterConnectors) {
            if (connector.channel == channel) {
                loseConnector = connector;
                break;
            }
        }

        if (loseConnector == null) {
            LOGGER.error("The remove failed, loseConnector not found, channel:{}", channel);
            return;
        }

        clusterConnectorMap.remove(loseConnector.conf.uniqueKey());
        LOGGER.error("The node was removed successfully, id:{}, channel:{}", loseConnector.conf.uniqueKey(), channel);
    }

    public ClusterConnector addClusterConnector(Channel channel) {
        Attribute<NettyChannel> attr = channel.attr(NettyChannel.NETTY_CHANNEL_KEY);
        NettyChannel nettyChannel = attr.get();
        if (nettyChannel == null) {
            LOGGER.error("Adding node connection information failed because the attr is null, channel:{}", channel);
            return null;
        }
        ClusterNodeConf otherClusterNode = (ClusterNodeConf)nettyChannel.attach();
        String uuid = otherClusterNode.uniqueKey();
        if (otherClusterNode == null) {
            LOGGER.error("Adding node connection information failed because the otherClusterNode is null, channel:{}, uuid:{}", channel, uuid);
            return null;
        }

        if (currentClusterNodeConf.getNodeType().equalsIgnoreCase(otherClusterNode.getNodeType()) ) {
//                otherClusterNode.hashCode() < currentClusterNodeConf.hashCode()
            LOGGER.warn("the same node unsupported,current:{}, other:{}", currentClusterNodeConf, otherClusterNode);
            return null;
        }

        ClusterClient rpcClient = clusterClientMap.get(ClusterType.getType(otherClusterNode.getNodeType()));
        if (rpcClient == null) {
            LOGGER.warn("The node connection failed and this node type[{}] cannot found", otherClusterNode.getNodeType());
            return null;
        }

        LOGGER.info("Adding node connection information success, channel:{}, uuid:{}", channel, uuid);
        return createClusterConnector(channel, otherClusterNode, rpcClient);
    }


    private void connectToOtherClusterNode(){
        Collection<ClusterNodeConf> otherClusterNodes = clusterNodeMap.values();
        otherClusterNodes.forEach(otherClusterNode -> {
            String key = otherClusterNode.uniqueKey();
            if (key.equals(currentClusterNodeConf.uniqueKey())) {
                return;
            }

            if (!currentClusterNodeConf.getJoinClustTypes().contains(otherClusterNode.getNodeType())) {
                return;
            }

            ClusterConnector clusterConnectorFromCache = clusterConnectorMap.get(key);
            if (clusterConnectorFromCache != null && clusterConnectorFromCache.hasConnected) {
                return;
            }

            if (currentClusterNodeConf.getNodeType().equalsIgnoreCase(otherClusterNode.getNodeType()) ) {
//                otherClusterNode.hashCode() < currentClusterNodeConf.hashCode()
                LOGGER.warn("the same node unsupported,current:{}, other:{}", currentClusterNodeConf, otherClusterNode);
                return;
            }

            LOGGER.info("Attempts to connect to the cluster node:{}", otherClusterNode);

            String[] split = otherClusterNode.getClusterIp().split(":");
            String host = split[0];
            int port = Integer.valueOf(split[1]);

            ClusterClient rpcClient = clusterClientMap.get(ClusterType.getType(otherClusterNode.getNodeType()));
            if (rpcClient == null) {
                LOGGER.warn("The node connection failed and this node type[{}] cannot found", otherClusterNode.getNodeType());
                return;
            }

            UnresolvedAddress unresolvedAddress = new UnresolvedAddress(host, port);
            JNettyConnection jConnection = (JNettyConnection) rpcClient.connector().connect(unresolvedAddress, true, otherClusterNode);
            jConnection.setReconnect(false);

        });
    }

    public ClusterConnector createClusterConnector(Channel channel, ClusterNodeConf otherClusterNode, ClusterClient rpcClient) {
        ClusterConnector connector = new ClusterConnector();
        connector.conf = otherClusterNode;
        connector.hasConnected = true;
        connector.channel = channel;
        connector.rpcClient = rpcClient;
        clusterConnectorMap.put(otherClusterNode.uniqueKey(), connector);
        return  connector;
    }

    public static class ClusterConnector {
        private ClusterNodeConf conf;
        private Channel channel;
        private ClusterClient rpcClient;
        private volatile boolean hasConnected;

        public ClusterClient getRpcClient() {
            return rpcClient;
        }

        public void setRpcClient(ClusterClient rpcClient) {
            this.rpcClient = rpcClient;
        }

        public boolean isHasConnected() {
            return hasConnected;
        }

        public void setHasConnected(boolean hasConnected) {
            this.hasConnected = hasConnected;
        }

        public ClusterNodeConf getConf() {
            return conf;
        }

        public void setConf(ClusterNodeConf conf) {
            this.conf = conf;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ClusterConnector that = (ClusterConnector) o;
            return Objects.equals(conf, that.conf);
        }

        @Override
        public int hashCode() {
            return Objects.hash(conf);
        }
    }

    public String displayClusterInfo() {
        Collection<ClusterConnector> clusterConnectors = clusterConnectorMap.values();
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("\n\tcluster info:\n");
        String colNodeId = String.format("%20s","nodeId");
        String nodeType = String.format("|%20s","nodeType");
        String colClusterIp = String.format("|%20s","clusterIp");
        String colLoad = String.format("|%5s","load");
        String colActTime = String.format("|%20s","activeTime");
        stringBuilder.append(colNodeId).
                append(nodeType).
                append(colClusterIp).
                append(colLoad).
                append(colActTime).
                append("\n");

        for (ClusterConnector connector : clusterConnectors) {
            ClusterNodeConf clusterNodeConf = clusterNodeMap.get(connector.getConf().uniqueKey());
            if (clusterNodeConf == null) {
                continue;
            }
            stringBuilder.append(String.format("%20s|",clusterNodeConf.getNodeId()));
            stringBuilder.append(String.format("%20s|", clusterNodeConf.getNodeType()));
            stringBuilder.append(String.format("%20s|", clusterNodeConf.getClusterIp()));
            stringBuilder.append(String.format("%5s|", clusterNodeConf.getCurrentLoad()));
            stringBuilder.append(String.format("%20s", new Date(clusterNodeConf.getLastActiveTimeMillis())));
            stringBuilder.append("\n");
        }
        stringBuilder.append("\n");
        return stringBuilder.toString();
    }
}


